package logstorage

import (
	"fmt"
	"strings"
	"sync"
	"sync/atomic"
	"unsafe"

	"github.com/cespare/xxhash/v2"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"

	"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)

// pipeStats processes '| stats ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe
type pipeStats struct {
	// byFields contains field names with optional buckets from 'by(...)' clause.
	byFields []*byStatsField

	// funcs contains stats functions to execute.
	funcs []pipeStatsFunc

	mode pipeStatsMode
}

type pipeStatsMode int

func (psm pipeStatsMode) needExportState() bool {
	switch psm {
	case pipeStatsModeRemote, pipeStatsModeProxy:
		return true
	default:
		return false
	}
}

func (psm pipeStatsMode) needImportState() bool {
	switch psm {
	case pipeStatsModeLocal, pipeStatsModeProxy:
		return true
	default:
		return false
	}
}

const (
	pipeStatsModeDefault = pipeStatsMode(0)
	pipeStatsModeRemote  = pipeStatsMode(1)
	pipeStatsModeLocal   = pipeStatsMode(2)
	pipeStatsModeProxy   = pipeStatsMode(3)
)

type pipeStatsFunc struct {
	// f is stats function to execute
	f statsFunc

	// iff is an additional filter, which is applied to results before executing f on them
	iff *ifFilter

	// resultName is the name of the output generated by f
	resultName string
}

type statsFunc interface {
	// String returns string representation of statsFunc
	String() string

	// updateNeededFields must update pf with the fields needed for calculating the given stats
	updateNeededFields(pf *prefixfilter.Filter)

	// newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc
	//
	// a must be used for allocating memory inside the returned statsProcessor.
	newStatsProcessor(a *chunkedAllocator) statsProcessor
}

// statsProcessor must process stats for some statsFunc.
//
// All the statsProcessor methods are called from a single goroutine at a time,
// so there is no need in the internal synchronization.
//
// sf is passed to every method here, so the implementation doesn't need to keep reference to statsFunc.
// This allows saving memory when calculating stats over big number of groups.
type statsProcessor interface {
	// updateStatsForAllRows must update statsProcessor stats for all the rows in br.
	//
	// It must return the change of internal state size in bytes for the statsProcessor.
	//
	// It is guaranteed that br contains at least a single row.
	updateStatsForAllRows(sf statsFunc, br *blockResult) int

	// updateStatsForRow must update statsProcessor stats for the row at rowIndex in br.
	//
	// It must return the change of internal state size in bytes for the statsProcessor.
	updateStatsForRow(sf statsFunc, br *blockResult, rowIndex int) int

	// mergeState must merge sfp state into statsProcessor state.
	//
	// a must be used for allocating memory inside mergeState.
	mergeState(a *chunkedAllocator, sf statsFunc, sfp statsProcessor)

	// exportState must append the statsProcessor state to dst and return the result.
	exportState(dst []byte, stopCh <-chan struct{}) []byte

	// importState must import the state from src into statsProcessor.
	//
	// src state is obtained via exportState.
	//
	// It must return the internal state size increase after the import.
	importState(src []byte, stopCh <-chan struct{}) (int, error)

	// finalizeStats must append string representation of the collected stats result to dst and return it.
	//
	// finalizeStats must immediately return if stopCh is closed.
	finalizeStats(sf statsFunc, dst []byte, stopCh <-chan struct{}) []byte
}

func (ps *pipeStats) String() string {
	s := ""
	switch ps.mode {
	case pipeStatsModeDefault:
		s = "stats"
	case pipeStatsModeRemote:
		s = "stats_remote"
	case pipeStatsModeLocal:
		s = "stats_local"
	case pipeStatsModeProxy:
		s = "stats_proxy"
	default:
		logger.Panicf("BUG: unknown mode: %d", ps.mode)
	}

	byFields := ps.byFields
	if len(byFields) > 0 {
		a := make([]string, len(byFields))
		for i := range byFields {
			a[i] = byFields[i].String()
		}
		s += " by (" + strings.Join(a, ", ") + ")"
	}

	funcs := ps.funcs
	if len(funcs) == 0 {
		logger.Panicf("BUG: pipeStats must contain at least a single statsFunc")
	}
	a := make([]string, len(funcs))

	needImportState := ps.mode.needImportState()
	for i, f := range funcs {
		resultNameQuoted := quoteTokenIfNeeded(f.resultName)
		if needImportState {
			a[i] = fmt.Sprintf("import_state(%s) as %s", resultNameQuoted, resultNameQuoted)
		} else {
			line := f.f.String()
			if f.iff != nil {
				line += " " + f.iff.String()
			}
			line += " as " + resultNameQuoted
			a[i] = line
		}
	}
	s += " " + strings.Join(a, ", ")
	return s
}

func (ps *pipeStats) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
	psRemote := *ps
	psRemote.mode = pipeStatsModeRemote

	psLocal := *ps

	switch ps.mode {
	case pipeStatsModeDefault:
		psLocal.mode = pipeStatsModeLocal
	case pipeStatsModeLocal:
		logger.Panicf("BUG: stats_local cannot be split")
	case pipeStatsModeProxy:
		logger.Panicf("BUG: stats_proxy cannot be split")
	case pipeStatsModeRemote:
		psLocal.mode = pipeStatsModeProxy
	default:
		logger.Panicf("BUG: unexpected pipeStatsMode: %d", ps.mode)
	}

	return &psRemote, []pipe{&psLocal}
}

func (ps *pipeStats) canLiveTail() bool {
	return false
}

func (ps *pipeStats) canReturnLastNResults() bool {
	return false
}

func (ps *pipeStats) updateNeededFields(pf *prefixfilter.Filter) {
	if ps.mode.needImportState() {
		ps.updateNeededFieldsLocal(pf)
		return
	}

	pfOrig := pf.Clone()
	pf.Reset()

	for _, f := range ps.funcs {
		if pfOrig.MatchString(f.resultName) {
			f.f.updateNeededFields(pf)
			if f.iff != nil {
				pf.AddAllowFilters(f.iff.allowFilters)
			}
		}
	}

	// byFields are needed unconditionally, since the output number of rows depends on them.
	for _, bf := range ps.byFields {
		pf.AddAllowFilter(bf.name)
	}
}

func (ps *pipeStats) updateNeededFieldsLocal(pf *prefixfilter.Filter) {
	pf.Reset()

	for _, bf := range ps.byFields {
		pf.AddAllowFilter(bf.name)
	}
	for _, f := range ps.funcs {
		pf.AddAllowFilter(f.resultName)
	}
}

func (ps *pipeStats) hasFilterInWithQuery() bool {
	for _, f := range ps.funcs {
		if f.iff.hasFilterInWithQuery() {
			return true
		}
	}
	return false
}

func (ps *pipeStats) initFilterInValues(cache *inValuesCache, getFieldValuesFunc getFieldValuesFunc, keepSubquery bool) (pipe, error) {
	funcsNew := make([]pipeStatsFunc, len(ps.funcs))
	for i := range ps.funcs {
		f := &ps.funcs[i]
		iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc, keepSubquery)
		if err != nil {
			return nil, err
		}
		fNew := *f
		fNew.iff = iffNew
		funcsNew[i] = fNew
	}
	psNew := *ps
	psNew.funcs = funcsNew
	return &psNew, nil
}

func (ps *pipeStats) visitSubqueries(visitFunc func(q *Query)) {
	for _, f := range ps.funcs {
		f.iff.visitSubqueries(visitFunc)
	}
}

func (ps *pipeStats) addByTimeField(step int64) {
	if step <= 0 {
		return
	}

	// add step to byFields
	stepStr := fmt.Sprintf("%d", step)
	dstFields := make([]*byStatsField, 0, len(ps.byFields)+1)
	hasByTime := false
	for _, f := range ps.byFields {
		if f.name == "_time" {
			f = &byStatsField{
				name:          "_time",
				bucketSizeStr: stepStr,
				bucketSize:    float64(step),
			}
			hasByTime = true
		}
		dstFields = append(dstFields, f)
	}
	if !hasByTime {
		dstFields = append(dstFields, &byStatsField{
			name:          "_time",
			bucketSizeStr: stepStr,
			bucketSize:    float64(step),
		})
	}
	ps.byFields = dstFields
}

func (ps *pipeStats) initRateFuncs(step int64) {
	if step <= 0 {
		return
	}

	stepSeconds := float64(step) / 1e9
	for _, f := range ps.funcs {
		switch t := f.f.(type) {
		case *statsRate:
			t.stepSeconds = stepSeconds
		case *statsRateSum:
			t.stepSeconds = stepSeconds
		}
	}
}

const stateSizeBudgetChunk = 1 << 20

func (ps *pipeStats) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
	maxStateSize := int64(float64(memory.Allowed()) * 0.4)

	psp := &pipeStatsProcessor{
		ps:          ps,
		concurrency: concurrency,
		stopCh:      stopCh,
		cancel:      cancel,
		ppNext:      ppNext,

		maxStateSize: maxStateSize,
	}
	psp.shards.Init = func(shard *pipeStatsProcessorShard) {
		shard.psp = psp
		shard.init()
	}

	psp.stateSizeBudget.Store(maxStateSize)

	return psp
}

type pipeStatsProcessor struct {
	ps          *pipeStats
	concurrency int
	stopCh      <-chan struct{}
	cancel      func()
	ppNext      pipeProcessor

	shards atomicutil.Slice[pipeStatsProcessorShard]

	maxStateSize    int64
	stateSizeBudget atomic.Int64

	errLock sync.Mutex
	err     error
}

type pipeStatsProcessorShard struct {
	psp *pipeStatsProcessor

	// groupMap is used for tracking small number of groups until it reaches pipeStatsGroupMapMaxLen.
	// After that the groups are tracked by groupMapShards.
	groupMap pipeStatsGroupMap

	// groupMapShards are used for tracking big number of groups.
	//
	// Every shard contains a share of unique groups, which are merged in parallel at flush().
	groupMapShards []pipeStatsGroupMapShard

	// a is used for reducing memory allocations when calculating stats among big number of different groups.
	a chunkedAllocator

	// bms and brTmp are used for applying per-func filters.
	bms   []bitmap
	brTmp blockResult

	columnValues [][]string
	keyBuf       []byte

	stateSizeBudget int
}

type pipeStatsGroupMapShard struct {
	pipeStatsGroupMap

	// The padding prevents false sharing
	_ [atomicutil.CacheLineSize - unsafe.Sizeof(pipeStatsGroupMap{})%atomicutil.CacheLineSize]byte
}

// the maximum number of groups to track in pipeStatsProcessorShard.groupMap before switching to pipeStatsProcessorShard.groupMapShards
//
// Too big value may slow down flush() across big number of CPU cores.
// Too small value may significantly increase RAM usage when stats for big number of groups is calculated.
const pipeStatsGroupMapMaxLen = 4 << 10

type pipeStatsGroupMap struct {
	shard *pipeStatsProcessorShard

	u64        map[uint64]*pipeStatsGroup
	negative64 map[uint64]*pipeStatsGroup
	strings    map[string]*pipeStatsGroup
}

func (psm *pipeStatsGroupMap) reset() {
	*psm = pipeStatsGroupMap{}
}

func (psm *pipeStatsGroupMap) init(shard *pipeStatsProcessorShard) {
	psm.shard = shard
}

func (psm *pipeStatsGroupMap) entriesCount() uint64 {
	n := len(psm.u64) + len(psm.negative64) + len(psm.strings)
	return uint64(n)
}

func (psm *pipeStatsGroupMap) getPipeStatsGroupUint64(n uint64) (*pipeStatsGroup, bool) {
	if psg := psm.u64[n]; psg != nil {
		return psg, false
	}

	psg := psm.shard.newPipeStatsGroup()
	psm.shard.stateSizeBudget -= psm.setPipeStatsGroupUint64(n, psg)
	return psg, true
}

func (psm *pipeStatsGroupMap) setPipeStatsGroupUint64(n uint64, psg *pipeStatsGroup) int {
	if psm.u64 == nil {
		psm.u64 = map[uint64]*pipeStatsGroup{
			n: psg,
		}
		return int(unsafe.Sizeof(psm.u64) + unsafe.Sizeof(n) + unsafe.Sizeof(psg))
	}
	psm.u64[n] = psg
	return int(unsafe.Sizeof(n) + unsafe.Sizeof(psg))
}

func (psm *pipeStatsGroupMap) getPipeStatsGroupNegativeInt64(n int64) (*pipeStatsGroup, bool) {
	if psg := psm.negative64[uint64(n)]; psg != nil {
		return psg, false
	}

	psg := psm.shard.newPipeStatsGroup()
	psm.shard.stateSizeBudget -= psm.setPipeStatsGroupNegativeInt64(n, psg)
	return psg, true
}

func (psm *pipeStatsGroupMap) setPipeStatsGroupNegativeInt64(n int64, psg *pipeStatsGroup) int {
	if psm.negative64 == nil {
		psm.negative64 = map[uint64]*pipeStatsGroup{
			uint64(n): psg,
		}
		return int(unsafe.Sizeof(psm.negative64) + unsafe.Sizeof(n) + unsafe.Sizeof(psg))
	}
	psm.negative64[uint64(n)] = psg
	return int(unsafe.Sizeof(n) + unsafe.Sizeof(psg))
}

func (psm *pipeStatsGroupMap) getPipeStatsGroupString(key []byte) (*pipeStatsGroup, bool) {
	if psg := psm.strings[string(key)]; psg != nil {
		return psg, false
	}

	psg := psm.shard.newPipeStatsGroup()
	keyCopy := psm.shard.a.cloneBytesToString(key)
	psm.shard.stateSizeBudget -= psm.setPipeStatsGroupString(keyCopy, psg) + len(keyCopy)
	return psg, true
}

func (psm *pipeStatsGroupMap) setPipeStatsGroupString(v string, psg *pipeStatsGroup) int {
	if psm.strings == nil {
		psm.strings = map[string]*pipeStatsGroup{
			v: psg,
		}
		return int(unsafe.Sizeof(psm.strings) + unsafe.Sizeof(v))
	}
	psm.strings[v] = psg
	return int(unsafe.Sizeof(v))
}

func (psm *pipeStatsGroupMap) mergeState(a *chunkedAllocator, src *pipeStatsGroupMap, stopCh <-chan struct{}) {
	for n, psgSrc := range src.u64 {
		if needStop(stopCh) {
			return
		}
		psgDst := psm.u64[n]
		if psgDst == nil {
			psm.setPipeStatsGroupUint64(n, psgSrc)
		} else {
			psgDst.mergeState(a, psgSrc)
		}
	}
	for n, psgSrc := range src.negative64 {
		if needStop(stopCh) {
			return
		}
		psgDst := psm.negative64[n]
		if psgDst == nil {
			psm.setPipeStatsGroupNegativeInt64(int64(n), psgSrc)
		} else {
			psgDst.mergeState(a, psgSrc)
		}
	}
	for k, psgSrc := range src.strings {
		if needStop(stopCh) {
			return
		}
		psgDst := psm.strings[k]
		if psgDst == nil {
			psm.setPipeStatsGroupString(k, psgSrc)
		} else {
			psgDst.mergeState(a, psgSrc)
		}
	}
}

func initStatsConcurrency(sfp statsProcessor, concurrency uint) {
	switch t := sfp.(type) {
	case *statsCountUniqProcessor:
		t.concurrency = concurrency
	case *statsCountUniqHashProcessor:
		t.concurrency = concurrency
	case *statsUniqValuesProcessor:
		t.concurrency = concurrency
	}
}

func (shard *pipeStatsProcessorShard) init() {
	shard.groupMap.init(shard)

	funcsLen := len(shard.psp.ps.funcs)
	shard.bms = make([]bitmap, funcsLen)
}

func (shard *pipeStatsProcessorShard) newPipeStatsGroup() *pipeStatsGroup {
	bytesAllocated := shard.a.bytesAllocated

	funcsLen := len(shard.psp.ps.funcs)
	sfps := shard.a.newStatsProcessors(uint(funcsLen))

	for i, f := range shard.psp.ps.funcs {
		sfp := f.f.newStatsProcessor(&shard.a)
		initStatsConcurrency(sfp, uint(shard.psp.concurrency))
		sfps[i] = sfp
	}

	psg := shard.a.newPipeStatsGroup()
	psg.funcs = shard.psp.ps.funcs
	psg.sfps = sfps

	shard.stateSizeBudget -= shard.a.bytesAllocated - bytesAllocated

	return psg
}

func (shard *pipeStatsProcessorShard) writeBlockDefault(br *blockResult) {
	byFields := shard.psp.ps.byFields

	// Update shard.bms by applying per-function filters
	shard.applyPerFunctionFilters(br)

	// Process stats for the defined functions
	if len(byFields) == 0 {
		// Fast path - pass all the rows to a single group with empty key.
		psg := shard.getPipeStatsGroupString(nil)
		shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
		return
	}
	if len(byFields) == 1 {
		// Special case for grouping by a single column.
		shard.updateStatsSingleColumn(br, byFields[0])
		return
	}

	// Obtain columns for byFields
	columnValues := slicesutil.SetLength(shard.columnValues, len(byFields))
	for i, bf := range byFields {
		c := br.getColumnByName(bf.name)
		if bf.hasBucketConfig() {
			columnValues[i] = c.getValuesBucketed(br, bf)
		} else {
			columnValues[i] = c.getValues(br)
		}
	}
	shard.columnValues = columnValues

	// Verify whether all the 'by (...)' columns are constant.
	areAllConstColumns := true
	for _, values := range columnValues {
		if !areConstValues(values) {
			areAllConstColumns = false
			break
		}
	}
	if areAllConstColumns {
		// Fast path for constant 'by (...)' columns.
		keyBuf := shard.keyBuf[:0]
		for _, values := range columnValues {
			keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
		}
		psg := shard.getPipeStatsGroupString(keyBuf)
		shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
		shard.keyBuf = keyBuf
		return
	}

	// The slowest path - group by multiple columns with different values across rows.
	var psg *pipeStatsGroup
	keyBuf := shard.keyBuf[:0]
	for i := 0; i < br.rowsLen; i++ {
		// Verify whether the key for 'by (...)' fields equals the previous key
		sameValue := i > 0
		for _, values := range columnValues {
			if i <= 0 || values[i-1] != values[i] {
				sameValue = false
				break
			}
		}
		if !sameValue {
			// Construct new key for the 'by (...)' fields
			keyBuf = keyBuf[:0]
			for _, values := range columnValues {
				keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
			}
			psg = shard.getPipeStatsGroupString(keyBuf)
		}
		shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
	}
	shard.keyBuf = keyBuf
}

func (shard *pipeStatsProcessorShard) writeBlockLocal(br *blockResult) {
	byFields := shard.psp.ps.byFields
	stopCh := shard.psp.stopCh

	cs := br.getColumns()
	shard.columnValues = slicesutil.SetLength(shard.columnValues, len(cs))
	columnValues := shard.columnValues
	for i, c := range cs {
		columnValues[i] = c.getValues(br)
	}
	if len(columnValues) < len(byFields)+1 {
		err := fmt.Errorf("at least %d columns must exist; got %d columns only", len(byFields)+1, len(columnValues))
		shard.psp.setError(err)
		return
	}
	byFieldValues := columnValues[:len(byFields)]
	columnValues = columnValues[len(byFields):]

	if len(byFields) == 0 {
		if br.rowsLen != 1 {
			err := fmt.Errorf("global stats must have only a single row; got %d rows", br.rowsLen)
			shard.psp.setError(err)
			return
		}
		psg := shard.getPipeStatsGroupString(nil)
		stateSize, err := psg.importStateFromRow(columnValues, 0, stopCh)
		if err != nil {
			shard.psp.setError(err)
			return
		}
		shard.stateSizeBudget -= stateSize
		return
	}
	if len(byFields) == 1 {
		for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
			v := byFieldValues[0][rowIdx]
			psg := shard.getPipeStatsGroupGeneric(v)
			stateSize, err := psg.importStateFromRow(columnValues, rowIdx, stopCh)
			if err != nil {
				shard.psp.setError(err)
				return
			}
			shard.stateSizeBudget -= stateSize

			if needStop(stopCh) {
				break
			}
		}
		return
	}

	keyBuf := shard.keyBuf
	for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
		keyBuf = keyBuf[:0]
		for _, values := range byFieldValues {
			keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[rowIdx]))
		}
		psg := shard.getPipeStatsGroupString(keyBuf)
		stateSize, err := psg.importStateFromRow(columnValues, rowIdx, stopCh)
		if err != nil {
			shard.psp.setError(err)
			return
		}
		shard.stateSizeBudget -= stateSize

		if needStop(stopCh) {
			break
		}
	}
	shard.keyBuf = keyBuf
}

func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, bf *byStatsField) {
	c := br.getColumnByName(bf.name)
	if c.isConst {
		// Fast path for column with a constant value.
		v := c.valuesEncoded[0]
		if bf.hasBucketConfig() {
			v = br.getBucketedValue(c.valuesEncoded[0], bf)
		}
		psg := shard.getPipeStatsGroupGeneric(v)
		shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
		return
	}

	if bf.hasBucketConfig() {
		values := c.getValuesBucketed(br, bf)
		if areConstValues(values) {
			// Fast path - values are constant after bucketing.
			psg := shard.getPipeStatsGroupGeneric(values[0])
			shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
			return
		}

		var psg *pipeStatsGroup
		for i := 0; i < br.rowsLen; i++ {
			if i <= 0 || values[i-1] != values[i] {
				psg = shard.getPipeStatsGroupGeneric(values[i])
			}
			shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
		}
		return
	}

	switch c.valueType {
	case valueTypeUint8:
		var psg *pipeStatsGroup
		values := c.getValuesEncoded(br)
		for i, v := range values {
			if i <= 0 || values[i-1] != v {
				n := unmarshalUint8(v)
				psg = shard.getPipeStatsGroupUint64(uint64(n))
			}
			shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
		}
		return
	case valueTypeUint16:
		var psg *pipeStatsGroup
		values := c.getValuesEncoded(br)
		for i, v := range values {
			if i <= 0 || values[i-1] != v {
				n := unmarshalUint16(v)
				psg = shard.getPipeStatsGroupUint64(uint64(n))
			}
			shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
		}
		return
	case valueTypeUint32:
		var psg *pipeStatsGroup
		values := c.getValuesEncoded(br)
		for i, v := range values {
			if i <= 0 || values[i-1] != v {
				n := unmarshalUint32(v)
				psg = shard.getPipeStatsGroupUint64(uint64(n))
			}
			shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
		}
		return
	case valueTypeUint64:
		var psg *pipeStatsGroup
		values := c.getValuesEncoded(br)
		for i, v := range values {
			if i <= 0 || values[i-1] != v {
				n := unmarshalUint64(v)
				psg = shard.getPipeStatsGroupUint64(n)
			}
			shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
		}
		return
	case valueTypeInt64:
		var psg *pipeStatsGroup
		values := c.getValuesEncoded(br)
		for i, v := range values {
			if i <= 0 || values[i-1] != v {
				n := unmarshalInt64(v)
				psg = shard.getPipeStatsGroupInt64(n)
			}
			shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
		}
		return
	}

	// Generic path for a column with different values.
	values := c.getValues(br)

	var psg *pipeStatsGroup
	for i := 0; i < br.rowsLen; i++ {
		if i <= 0 || values[i-1] != values[i] {
			psg = shard.getPipeStatsGroupGeneric(values[i])
		}
		shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
	}
}

func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
	funcs := shard.psp.ps.funcs
	for i := range funcs {
		iff := funcs[i].iff
		if iff == nil {
			continue
		}

		bm := &shard.bms[i]
		bm.init(br.rowsLen)
		bm.setBits()

		iff.f.applyToBlockResult(br, bm)
	}
}

func (shard *pipeStatsProcessorShard) getPipeStatsGroupGeneric(v string) *pipeStatsGroup {
	if n, ok := tryParseUint64(v); ok {
		return shard.getPipeStatsGroupUint64(n)
	}
	if len(v) > 0 && v[0] == '-' {
		if n, ok := tryParseInt64(v); ok {
			return shard.getPipeStatsGroupNegativeInt64(n)
		}
	}
	return shard.getPipeStatsGroupString(bytesutil.ToUnsafeBytes(v))
}

func (shard *pipeStatsProcessorShard) getPipeStatsGroupInt64(n int64) *pipeStatsGroup {
	if n >= 0 {
		return shard.getPipeStatsGroupUint64(uint64(n))
	}
	return shard.getPipeStatsGroupNegativeInt64(n)
}

func (shard *pipeStatsProcessorShard) getPipeStatsGroupUint64(n uint64) *pipeStatsGroup {
	if shard.groupMapShards == nil {
		psg, isNew := shard.groupMap.getPipeStatsGroupUint64(n)
		if isNew {
			shard.probablyMoveGroupMapToShards(&shard.a)
		}
		return psg
	}
	psm := shard.getGroupMapShardByUint64(n)
	psg, _ := psm.getPipeStatsGroupUint64(n)
	return psg
}

func (shard *pipeStatsProcessorShard) getPipeStatsGroupNegativeInt64(n int64) *pipeStatsGroup {
	if shard.groupMapShards == nil {
		psg, isNew := shard.groupMap.getPipeStatsGroupNegativeInt64(n)
		if isNew {
			shard.probablyMoveGroupMapToShards(&shard.a)
		}
		return psg
	}
	psm := shard.getGroupMapShardByUint64(uint64(n))
	psg, _ := psm.getPipeStatsGroupNegativeInt64(n)
	return psg
}

func (shard *pipeStatsProcessorShard) getPipeStatsGroupString(v []byte) *pipeStatsGroup {
	if shard.groupMapShards == nil {
		psg, isNew := shard.groupMap.getPipeStatsGroupString(v)
		if isNew {
			shard.probablyMoveGroupMapToShards(&shard.a)
		}
		return psg
	}
	psm := shard.getGroupMapShardByString(v)
	psg, _ := psm.getPipeStatsGroupString(v)
	return psg
}

func (shard *pipeStatsProcessorShard) probablyMoveGroupMapToShards(a *chunkedAllocator) {
	if shard.groupMap.entriesCount() < pipeStatsGroupMapMaxLen {
		return
	}
	shard.moveGroupMapToShards(a)
}

func (shard *pipeStatsProcessorShard) moveGroupMapToShards(a *chunkedAllocator) {
	bytesAllocatedPrev := a.bytesAllocated
	shard.groupMapShards = a.newPipeStatsGroupMapShards(uint(shard.psp.concurrency))
	shard.stateSizeBudget -= a.bytesAllocated - bytesAllocatedPrev

	for i := range shard.groupMapShards {
		shard.groupMapShards[i].init(shard)
	}

	for n, psg := range shard.groupMap.u64 {
		psm := shard.getGroupMapShardByUint64(n)
		psm.setPipeStatsGroupUint64(n, psg)
	}
	for n, psg := range shard.groupMap.negative64 {
		psm := shard.getGroupMapShardByUint64(n)
		psm.setPipeStatsGroupNegativeInt64(int64(n), psg)
	}
	for s, psg := range shard.groupMap.strings {
		psm := shard.getGroupMapShardByString(bytesutil.ToUnsafeBytes(s))
		psm.setPipeStatsGroupString(s, psg)
	}

	shard.groupMap.reset()
}

func (shard *pipeStatsProcessorShard) getGroupMapShardByString(v []byte) *pipeStatsGroupMap {
	h := xxhash.Sum64(v)
	shardIdx := h % uint64(len(shard.groupMapShards))
	return &shard.groupMapShards[shardIdx].pipeStatsGroupMap
}

func (shard *pipeStatsProcessorShard) getGroupMapShardByUint64(n uint64) *pipeStatsGroupMap {
	h := fastHashUint64(n)
	shardIdx := h % uint64(len(shard.groupMapShards))
	return &shard.groupMapShards[shardIdx].pipeStatsGroupMap
}

type pipeStatsGroup struct {
	funcs []pipeStatsFunc
	sfps  []statsProcessor
}

func (psg *pipeStatsGroup) importStateFromRow(columnValues [][]string, rowIdx int, stopCh <-chan struct{}) (int, error) {
	sfps := psg.sfps
	if len(columnValues) != len(sfps) {
		return 0, fmt.Errorf("unexpected number of columns; got %d; want %d", len(columnValues), len(sfps))
	}

	n := 0
	for i, sfp := range psg.sfps {
		v := columnValues[i][rowIdx]
		stateSize, err := sfp.importState(bytesutil.ToUnsafeBytes(v), stopCh)
		if err != nil {
			return 0, fmt.Errorf("cannot import state for %s: %w", psg.funcs[i].f, err)
		}
		n += stateSize
	}
	return n, nil
}

func (psg *pipeStatsGroup) mergeState(a *chunkedAllocator, src *pipeStatsGroup) {
	for i, sfp := range psg.sfps {
		sfp.mergeState(a, psg.funcs[i].f, src.sfps[i])
	}
}

func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int {
	n := 0
	for i, sfp := range psg.sfps {
		f := &psg.funcs[i]
		iff := f.iff
		if iff == nil {
			n += sfp.updateStatsForAllRows(f.f, br)
		} else {
			brTmp.initFromFilterAllColumns(br, &bms[i])
			if brTmp.rowsLen > 0 {
				n += sfp.updateStatsForAllRows(f.f, brTmp)
			}
		}
	}
	return n
}

func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int {
	n := 0
	for i, sfp := range psg.sfps {
		f := &psg.funcs[i]
		iff := f.iff
		if iff == nil || bms[i].isSetBit(rowIdx) {
			n += sfp.updateStatsForRow(f.f, br, rowIdx)
		}
	}
	return n
}

func (psp *pipeStatsProcessor) setError(err error) {
	psp.errLock.Lock()
	psp.err = err
	psp.errLock.Unlock()
	psp.cancel()
}

func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
	if br.rowsLen == 0 {
		return
	}

	shard := psp.shards.Get(workerID)

	for shard.stateSizeBudget < 0 {
		// steal some budget for the state size from the global budget.
		remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk)
		if remaining < 0 {
			// The state size is too big. Stop processing data in order to avoid OOM crash.
			if remaining+stateSizeBudgetChunk >= 0 {
				// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
				psp.cancel()
			}
			return
		}
		shard.stateSizeBudget += stateSizeBudgetChunk
	}

	if psp.ps.mode.needImportState() {
		shard.writeBlockLocal(br)
	} else {
		shard.writeBlockDefault(br)
	}
}

func (psp *pipeStatsProcessor) flush() error {
	if psp.err != nil {
		return psp.err
	}

	if n := psp.stateSizeBudget.Load(); n <= 0 {
		return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
	}

	// Merge states across shards in parallel
	psms := psp.mergeShardsParallel()
	if needStop(psp.stopCh) {
		return nil
	}

	if len(psp.ps.byFields) == 0 && len(psms) == 0 {
		// Special case - zero matching rows.
		shard := psp.shards.Get(0)
		shard.init()
		shard.groupMap.getPipeStatsGroupString(nil)
		psms = append(psms, &shard.groupMap)
	}

	// Write the calculated stats in parallel to the next pipe.
	var wg sync.WaitGroup
	for i := range psms {
		wg.Add(1)
		go func(workerID uint) {
			defer wg.Done()

			psw := newPipeStatsWriter(psp, workerID)
			psw.writeShardData(psms[workerID])
			psw.flush()
		}(uint(i))
	}
	wg.Wait()

	return nil
}

type pipeStatsWriter struct {
	psp      *pipeStatsProcessor
	workerID uint

	rcs []resultColumn
	br  blockResult

	resultLen int
	rowsCount int

	values    []string
	valuesBuf []byte
}

func newPipeStatsWriter(psp *pipeStatsProcessor, workerID uint) *pipeStatsWriter {
	byFields := psp.ps.byFields
	rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
	for _, bf := range byFields {
		rcs = appendResultColumnWithName(rcs, bf.name)
	}
	for _, f := range psp.ps.funcs {
		rcs = appendResultColumnWithName(rcs, f.resultName)
	}

	psw := &pipeStatsWriter{
		psp:      psp,
		workerID: workerID,
		rcs:      rcs,
	}
	return psw
}

func (psw *pipeStatsWriter) writePipeStatsGroup(psg *pipeStatsGroup) {
	needExportState := psw.psp.ps.mode.needExportState()
	stopCh := psw.psp.stopCh
	for i, sfp := range psg.sfps {
		bufLen := len(psw.valuesBuf)
		if needExportState {
			psw.valuesBuf = sfp.exportState(psw.valuesBuf, stopCh)
		} else {
			psw.valuesBuf = sfp.finalizeStats(psg.funcs[i].f, psw.valuesBuf, stopCh)
		}
		value := bytesutil.ToUnsafeString(psw.valuesBuf[bufLen:])
		psw.values = append(psw.values, value)
	}
	if len(psw.values) != len(psw.rcs) {
		logger.Panicf("BUG: len(values)=%d must be equal to len(rcs)=%d", len(psw.values), len(psw.rcs))
	}

	n := 0
	for i, v := range psw.values {
		psw.rcs[i].addValue(v)
		n += len(v)
	}
	psw.resultLen += n
	psw.rowsCount++

	// The 64_000 limit provides the best performance results when generating stats
	// over big number of distinct groups.
	if psw.resultLen >= 64_000 {
		psw.flush()
	}
}

func (psw *pipeStatsWriter) flush() {
	psw.br.setResultColumns(psw.rcs, psw.rowsCount)
	psw.resultLen = 0
	psw.rowsCount = 0
	psw.psp.ppNext.writeBlock(psw.workerID, &psw.br)
	psw.br.reset()
	for i := range psw.rcs {
		psw.rcs[i].resetValues()
	}
	psw.values = psw.values[:0]
	psw.valuesBuf = psw.valuesBuf[:0]
}

func (psw *pipeStatsWriter) writeShardData(psm *pipeStatsGroupMap) {
	byFields := psw.psp.ps.byFields
	if len(byFields) == 1 {
		for n, psg := range psm.u64 {
			if needStop(psw.psp.stopCh) {
				return
			}
			psw.values = psw.values[:0]

			// Reconstruct value for byFields[0]
			valuesBufLen := len(psw.valuesBuf)
			psw.valuesBuf = marshalUint64String(psw.valuesBuf, n)
			psw.values = append(psw.values, bytesutil.ToUnsafeString(psw.valuesBuf[valuesBufLen:]))

			psw.writePipeStatsGroup(psg)
		}
		for n, psg := range psm.negative64 {
			if needStop(psw.psp.stopCh) {
				return
			}
			psw.values = psw.values[:0]

			// Reconstruct value for byFields[0]
			valuesBufLen := len(psw.valuesBuf)
			psw.valuesBuf = marshalInt64String(psw.valuesBuf, int64(n))
			psw.values = append(psw.values, bytesutil.ToUnsafeString(psw.valuesBuf[valuesBufLen:]))

			psw.writePipeStatsGroup(psg)
		}
		for key, psg := range psm.strings {
			if needStop(psw.psp.stopCh) {
				return
			}
			psw.values = psw.values[:0]

			psw.values = append(psw.values, key)
			psw.writePipeStatsGroup(psg)
		}
	} else {
		for key, psg := range psm.strings {
			if needStop(psw.psp.stopCh) {
				return
			}
			psw.values = psw.values[:0]

			// Unmarshal values for byFields from key.
			keyBuf := bytesutil.ToUnsafeBytes(key)
			for len(keyBuf) > 0 {
				v, nSize := encoding.UnmarshalBytes(keyBuf)
				if nSize <= 0 {
					logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q", keyBuf)
				}
				keyBuf = keyBuf[nSize:]
				psw.values = append(psw.values, bytesutil.ToUnsafeString(v))
			}
			if len(psw.values) != len(byFields) {
				logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(psw.values), len(byFields))
			}

			psw.writePipeStatsGroup(psg)
		}
	}
}

func (psp *pipeStatsProcessor) mergeShardsParallel() []*pipeStatsGroupMap {
	shards := psp.shards.All()
	if len(shards) == 0 {
		return nil
	}

	var wg sync.WaitGroup
	for _, shard := range shards {
		if shard.groupMapShards != nil {
			continue
		}

		wg.Add(1)
		go func() {
			defer wg.Done()

			var a chunkedAllocator
			shard.moveGroupMapToShards(&a)
		}()
	}
	wg.Wait()
	if needStop(psp.stopCh) {
		return nil
	}

	psms := shards[0].groupMapShards
	shards = shards[1:]
	for i := range psms {
		wg.Add(1)
		go func(cpuIdx int) {
			defer wg.Done()

			var a chunkedAllocator
			psm := &psms[cpuIdx].pipeStatsGroupMap
			for _, shard := range shards {
				src := &shard.groupMapShards[cpuIdx].pipeStatsGroupMap
				psm.mergeState(&a, src, psp.stopCh)
				src.reset()
			}
		}(i)
	}
	wg.Wait()
	if needStop(psp.stopCh) {
		return nil
	}

	// Filter out maps without entries
	result := make([]*pipeStatsGroupMap, 0, len(psms))
	for i := range psms {
		if psms[i].entriesCount() > 0 {
			result = append(result, &psms[i].pipeStatsGroupMap)
		}
	}

	return result
}

func parsePipeStats(lex *lexer) (pipe, error) {
	return parsePipeStatsExt(lex, true)
}

func parsePipeStatsNoStatsKeyword(lex *lexer) (pipe, error) {
	return parsePipeStatsExt(lex, false)
}

func parsePipeStatsExt(lex *lexer, needStatsKeyword bool) (pipe, error) {
	var ps pipeStats
	if needStatsKeyword {
		switch {
		case lex.isKeyword("stats"):
			lex.nextToken()
			ps.mode = pipeStatsModeDefault
		case lex.isKeyword("stats_remote"):
			lex.nextToken()
			ps.mode = pipeStatsModeRemote
		default:
			return nil, fmt.Errorf("expecting 'stats' or 'stats_remote'; got %q", lex.token)
		}
	}

	if lex.isKeyword("by", "(") {
		if lex.isKeyword("by") {
			lex.nextToken()
		}
		bfs, err := parseByStatsFields(lex)
		if err != nil {
			return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
		}
		ps.byFields = bfs
	}

	seenByFields := make(map[string]*byStatsField, len(ps.byFields))
	for _, bf := range ps.byFields {
		seenByFields[bf.name] = bf
	}

	seenResultNames := make(map[string]statsFunc)

	var funcs []pipeStatsFunc
	for {
		var f pipeStatsFunc

		sf, err := parseStatsFunc(lex)
		if err != nil {
			return nil, err
		}
		f.f = sf

		if lex.isKeyword("if") {
			iff, err := parseIfFilter(lex)
			if err != nil {
				return nil, fmt.Errorf("cannot parse 'if' filter for [%s]: %w", sf, err)
			}
			f.iff = iff
		}

		resultName := ""
		if lex.isKeyword(",", "|", ")", "") {
			resultName = sf.String()
			if f.iff != nil {
				resultName += " " + f.iff.String()
			}
		} else {
			if lex.isKeyword("as") {
				lex.nextToken()
			}
			fieldName, err := parseFieldName(lex)
			if err != nil {
				return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err)
			}
			resultName = fieldName
		}
		if bf := seenByFields[resultName]; bf != nil {
			return nil, fmt.Errorf("the %q is used as 'by' field [%s], so it cannot be used as result name for [%s]", resultName, bf, sf)
		}
		if sfPrev := seenResultNames[resultName]; sfPrev != nil {
			return nil, fmt.Errorf("cannot use identical result name %q for [%s] and [%s]", resultName, sfPrev, sf)
		}
		seenResultNames[resultName] = sf
		f.resultName = resultName

		funcs = append(funcs, f)

		if lex.isKeyword("|", ")", "") {
			ps.funcs = funcs
			return &ps, nil
		}
		if !lex.isKeyword(",") {
			return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", lex.token, sf)
		}
		lex.nextToken()
	}
}

func parseStatsFunc(lex *lexer) (statsFunc, error) {
	sps := getStatsFuncParsers()
	for funcName, parserFunc := range sps {
		if !lex.isKeyword(funcName) {
			continue
		}
		sf, err := parserFunc(lex)
		if err != nil {
			return nil, fmt.Errorf("cannot parse %q func: %w", funcName, err)
		}
		return sf, nil
	}
	return nil, fmt.Errorf("unknown stats func %q", lex.token)
}

var statsFuncParsers map[string]statsFuncParser
var statsFuncParsersOnce sync.Once

type statsFuncParser func(lex *lexer) (statsFunc, error)

func getStatsFuncParsers() map[string]statsFuncParser {
	statsFuncParsersOnce.Do(initStatsFuncParsers)
	return statsFuncParsers
}

func initStatsFuncParsers() {
	statsFuncParsers = map[string]statsFuncParser{
		"avg":             parseStatsAvg,
		"count":           parseStatsCount,
		"count_empty":     parseStatsCountEmpty,
		"count_uniq":      parseStatsCountUniq,
		"count_uniq_hash": parseStatsCountUniqHash,
		"histogram":       parseStatsHistogram,
		"json_values":     parseStatsJSONValues,
		"max":             parseStatsMax,
		"median":          parseStatsMedian,
		"min":             parseStatsMin,
		"quantile":        parseStatsQuantile,
		"rate":            parseStatsRate,
		"rate_sum":        parseStatsRateSum,
		"row_any":         parseStatsRowAny,
		"row_max":         parseStatsRowMax,
		"row_min":         parseStatsRowMin,
		"sum":             parseStatsSum,
		"sum_len":         parseStatsSumLen,
		"uniq_values":     parseStatsUniqValues,
		"values":          parseStatsValues,
	}
}

func isStatsFuncName(s string) bool {
	sps := getStatsFuncParsers()
	sLower := strings.ToLower(s)
	return sps[sLower] != nil
}

// byStatsField represents 'by (...)' part of the pipeStats.
//
// It can have either 'name' representation or 'name:bucket' or 'name:bucket offset off' representation,
// where `bucket` and `off` can contain duration, size or numeric value for creating different buckets
// for 'value/bucket'.
type byStatsField struct {
	name string

	// bucketSizeStr is string representation of the bucket size
	bucketSizeStr string

	// bucketSize is the bucket for grouping the given field values with value/bucketSize calculations
	bucketSize float64

	// bucketOffsetStr is string representation of the offset for bucketSize
	bucketOffsetStr string

	// bucketOffset is the offset for bucketSize
	bucketOffset float64
}

func (bf *byStatsField) String() string {
	s := quoteTokenIfNeeded(bf.name)
	if bf.bucketSizeStr != "" {
		s += ":" + bf.bucketSizeStr
		if bf.bucketOffsetStr != "" {
			s += " offset " + bf.bucketOffsetStr
		}
	}
	return s
}

func (bf *byStatsField) hasBucketConfig() bool {
	return len(bf.bucketSizeStr) > 0 || len(bf.bucketOffsetStr) > 0
}

func parseByStatsFields(lex *lexer) ([]*byStatsField, error) {
	if !lex.isKeyword("(") {
		return nil, fmt.Errorf("missing `(`")
	}
	var bfs []*byStatsField
	for {
		lex.nextToken()
		if lex.isKeyword(")") {
			lex.nextToken()
			return bfs, nil
		}
		fieldName, err := lex.nextCompoundTokenExt([]string{":"})
		if err != nil {
			return nil, fmt.Errorf("cannot parse field name: %w", err)
		}
		fieldName = getCanonicalColumnName(fieldName)
		bf := &byStatsField{
			name: fieldName,
		}
		if lex.isKeyword(":") {
			// Parse bucket size
			lex.nextToken()

			bucketSizeStr, err := lex.nextCompoundToken()
			if err != nil {
				return nil, fmt.Errorf("cannot parse bucket size for field %q: %w", fieldName, err)
			}
			if bucketSizeStr != "year" && bucketSizeStr != "month" {
				bucketSize, ok := tryParseBucketSize(bucketSizeStr)
				if !ok {
					return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr)
				}
				bf.bucketSize = bucketSize
			}
			bf.bucketSizeStr = bucketSizeStr

			// Parse bucket offset
			if lex.isKeyword("offset") {
				lex.nextToken()

				bucketOffsetStr, err := lex.nextCompoundToken()
				if err != nil {
					return nil, fmt.Errorf("cannot parse offset token for %q: %w", fieldName, err)
				}

				bucketOffset, ok := tryParseBucketOffset(bucketOffsetStr)
				if !ok {
					return nil, fmt.Errorf("cannot parse bucket offset for field %q: %q", fieldName, bucketOffsetStr)
				}
				bf.bucketOffsetStr = bucketOffsetStr
				bf.bucketOffset = bucketOffset
			}
		}
		bfs = append(bfs, bf)
		switch {
		case lex.isKeyword(")"):
			lex.nextToken()
			return bfs, nil
		case lex.isKeyword(","):
		default:
			return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
		}
	}
}

// tryParseBucketOffset tries parsing bucket offset, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
// - duration: 1.5s - it is converted to nanoseconds
// - bytes: 1.5KiB
func tryParseBucketOffset(s string) (float64, bool) {
	// Try parsing s as floating point number
	if f, ok := tryParseFloat64(s); ok {
		return f, true
	}

	// Try parsing s as duration (1s, 5m, etc.)
	if nsecs, ok := tryParseDuration(s); ok {
		return float64(nsecs), true
	}

	// Try parsing s as bytes (KiB, MB, etc.)
	if n, ok := tryParseBytes(s); ok {
		return float64(n), true
	}

	return 0, false
}

// tryParseBucketSize tries parsing bucket size, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
// - duration: 1.5s - it is converted to nanoseconds
// - bytes: 1.5KiB
// - ipv4 mask: /24
func tryParseBucketSize(s string) (float64, bool) {
	switch s {
	case "nanosecond":
		return 1, true
	case "microsecond":
		return nsecsPerMicrosecond, true
	case "millisecond":
		return nsecsPerMillisecond, true
	case "second":
		return nsecsPerSecond, true
	case "minute":
		return nsecsPerMinute, true
	case "hour":
		return nsecsPerHour, true
	case "day":
		return nsecsPerDay, true
	case "week":
		return nsecsPerWeek, true
	}

	// Try parsing s as floating point number
	if f, ok := tryParseFloat64(s); ok {
		return f, f > 0
	}

	// Try parsing s as duration (1s, 5m, etc.)
	if nsecs, ok := tryParseDuration(s); ok {
		return float64(nsecs), nsecs > 0
	}

	// Try parsing s as bytes (KiB, MB, etc.)
	if n, ok := tryParseBytes(s); ok {
		return float64(n), n > 0
	}

	if n, ok := tryParseIPv4Mask(s); ok {
		return float64(n), n > 0
	}

	return 0, false
}

func parseFieldNamesInParens(lex *lexer) ([]string, error) {
	fieldNames, err := parseFieldFiltersInParens(lex)
	if err != nil {
		return nil, err
	}
	for _, fieldName := range fieldNames {
		if prefixfilter.IsWildcardFilter(fieldName) {
			return nil, fmt.Errorf("the field name %q cannot end with '*'", fieldName)
		}
	}
	return fieldNames, nil
}

func parseFieldFiltersInParens(lex *lexer) ([]string, error) {
	if !lex.isKeyword("(") {
		return nil, fmt.Errorf("missing `(`")
	}
	var fields []string
	for {
		lex.nextToken()
		if lex.isKeyword(")") {
			lex.nextToken()
			return fields, nil
		}
		if lex.isKeyword(",") {
			return nil, fmt.Errorf("unexpected `,`")
		}
		field, err := parseFieldFilter(lex)
		if err != nil {
			return nil, err
		}
		fields = append(fields, field)
		switch {
		case lex.isKeyword(")"):
			lex.nextToken()
			return fields, nil
		case lex.isKeyword(","):
		default:
			return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
		}
	}
}

func parseFieldName(lex *lexer) (string, error) {
	fieldName, err := lex.nextCompoundToken()
	if err != nil {
		return "", err
	}
	fieldName = getCanonicalColumnName(fieldName)
	return fieldName, nil
}

func parseFieldFilter(lex *lexer) (string, error) {
	if lex.isKeyword("*") {
		lex.nextToken()
		return "*", nil
	}

	fieldName, err := lex.nextCompoundToken()
	if err != nil {
		return "", err
	}
	fieldName = getCanonicalColumnName(fieldName)
	if !lex.isSkippedSpace && lex.isKeyword("*") {
		lex.nextToken()
		fieldName += "*"
	}

	return fieldName, nil
}

func fieldNamesString(fields []string) string {
	a := make([]string, len(fields))
	for i, f := range fields {
		a[i] = quoteFieldFilterIfNeeded(f)
	}
	return strings.Join(a, ", ")
}

func areConstValues(values []string) bool {
	if len(values) == 0 {
		return false
	}
	v := values[0]
	for i := 1; i < len(values); i++ {
		if v != values[i] {
			return false
		}
	}
	return true
}
